package com.erf.client;

import com.erf.exception.NoConnectException;
import com.erf.handler.client.SubmitRequestResponseHandler;
import com.erf.initalizer.ClientChannelInitializer;
import com.erf.message.SubmitRequest;
import com.erf.message.SubmitRequestResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.SocketUtils;
import lombok.extern.slf4j.Slf4j;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

/**
 * @author zhangzy
 * @date 2022/7/5
 */
@Slf4j
public class RpcClient {
    private final Bootstrap bootstrap;
    private static ConcurrentHashMap<String, ArrayList<Channel>> userChannelMap = new ConcurrentHashMap<>();

    private RpcClient() {
        bootstrap = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());
    }

    /**
     * @param host 对向连接地址,逗号分割
     * @param port 对象连接端口
     * @return ChannelFuture连接对象，当存在多个连接时将返回最后一个连接对象。
     */
    public ChannelFuture connect(String userName, String host, int port) {
        return connect(userName, host, port, 1, -1);
    }

    /**
     * @param host 对向连接地址,逗号分割
     * @param port 对象连接端口
     * @param num  连接数
     * @return ChannelFuture连接对象，当存在多个连接时将返回最后一个连接对象。
     */
    public ChannelFuture connect(String userName, String host, int port, int num) {
        return connect(userName, host, port, num, -1);
    }

    /**
     * @param host      对向连接地址,逗号分割
     * @param port      对象连接端口
     * @param num       连接数
     * @param localPort 本地端口开启地址
     * @return ChannelFuture连接对象，当存在多个连接时将返回最后一个连接对象。
     */
    public synchronized ChannelFuture connect(String userName, String host, int port, int num, int localPort) {
        ChannelFuture future = null;
        for (int i = 0; i < num; i++) {
            if (localPort == -1) {
                future = bootstrap.connect(SocketUtils.socketAddress(host, port));
            } else {
                future = bootstrap.connect(SocketUtils.socketAddress(host, port), SocketUtils.socketAddress("localhost", localPort));
            }
            ChannelFuture finalFuture = future;
            future.addListener((GenericFutureListener<ChannelFuture>) f -> {
                if (!f.isSuccess()) {
                    log.error("Connect to {}:{} failed. cause by {}.", host, port, f.cause().getMessage());
                } else {
                    Channel channel = finalFuture.channel();
                    ArrayList<Channel> channels = userChannelMap.get(userName);
                    if (channels == null) {
                        ArrayList<Channel> tmp = new ArrayList<>();
                        tmp.add(channel);
                        userChannelMap.put(userName, tmp);
                    } else {
                        channels.add(channel);
                    }
                }
            });
        }
        return future;
    }

    private static final SecureRandom RANDOM = new SecureRandom();

    public static SubmitRequestResponse writeRandom(String userName, SubmitRequest submitRequest) throws NoConnectException, InterruptedException {
        ArrayList<Channel> channels = userChannelMap.get(userName);
        if (channels == null) {
            throw new NoConnectException(userName);
        } else {
            Channel channel = channels.get(RANDOM.nextInt(channels.size()));
            channel.writeAndFlush(submitRequest);
            DefaultPromise<SubmitRequestResponse> promise = new DefaultPromise<>(channel.eventLoop());
            SubmitRequestResponseHandler.PROMISES.put(submitRequest.getSequence(), promise);
            promise.sync();
            try {
                return promise.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return null;
    }


    private static class SingletonHolder {
        private static final RpcClient INSTANCE = new RpcClient();
    }

    public static RpcClient getInstance() {
        return SingletonHolder.INSTANCE;
    }

}
